#
# Copyright 2014-2015 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
from hashlib import md5
import itertools
import operator
import re
import threading
import uuid

from gnocchiclient import exceptions as gnocchi_exc
from gnocchiclient import utils as gnocchi_utils
from keystoneauth1 import exceptions as ka_exceptions
from oslo_config import cfg
from oslo_log import log
from oslo_utils import fnmatch
from oslo_utils import timeutils
import retrying
import six
from stevedore import extension

from ceilometer import declarative
from ceilometer import dispatcher
from ceilometer.i18n import _, _LE, _LW
from ceilometer import gnocchi_client
from ceilometer import keystone_client

NAME_ENCODED = __name__.encode('utf-8')
CACHE_NAMESPACE = uuid.UUID(bytes=md5(NAME_ENCODED).digest())
LOG = log.getLogger(__name__)

dispatcher_opts = [
    cfg.BoolOpt('filter_service_activity',
                default=True,
                help='Filter out samples generated by Gnocchi '
                'service activity'),
    cfg.StrOpt('filter_project',
               default='gnocchi',
               help='Gnocchi project used to filter out samples '
               'generated by Gnocchi service activity'),
    cfg.StrOpt('archive_policy',
               help='The archive policy to use when the dispatcher '
               'create a new metric.'),
    cfg.StrOpt('resources_definition_file',
               default='gnocchi_resources.yaml',
               help=_('The Yaml file that defines mapping between samples '
                      'and gnocchi resources/metrics')),
]

cfg.CONF.register_opts(dispatcher_opts, group="dispatcher_gnocchi")


def cache_key_mangler(key):
    """Construct an opaque cache key."""
    if six.PY2:
        key = key.encode('utf-8')
    return uuid.uuid5(CACHE_NAMESPACE, key).hex


EVENT_CREATE, EVENT_UPDATE, EVENT_DELETE = ("create", "update", "delete")


class ResourcesDefinition(object):

    MANDATORY_FIELDS = {'resource_type': six.string_types,
                        'metrics': list}

    def __init__(self, definition_cfg, default_archive_policy, plugin_manager):
        self._default_archive_policy = default_archive_policy
        self.cfg = definition_cfg

        for field, field_type in self.MANDATORY_FIELDS.items():
            if field not in self.cfg:
                raise declarative.ResourceDefinitionException(
                    _LE("Required field %s not specified") % field, self.cfg)
            if not isinstance(self.cfg[field], field_type):
                raise declarative.ResourceDefinitionException(
                    _LE("Required field %(field)s should be a %(type)s") %
                    {'field': field, 'type': field_type}, self.cfg)

        self._attributes = {}
        for name, attr_cfg in self.cfg.get('attributes', {}).items():
            self._attributes[name] = declarative.Definition(name, attr_cfg,
                                                            plugin_manager)

        self.metrics = {}
        for t in self.cfg['metrics']:
            archive_policy = self.cfg.get('archive_policy',
                                          self._default_archive_policy)
            if archive_policy is None:
                self.metrics[t] = {}
            else:
                self.metrics[t] = dict(archive_policy_name=archive_policy)

    @staticmethod
    def _ensure_list(value):
        if isinstance(value, list):
            return value
        return [value]

    def metric_match(self, metric_name):
        for t in self.cfg['metrics']:
            if fnmatch.fnmatch(metric_name, t):
                return True
        return False

    @property
    def support_events(self):
        for e in ["event_create", "event_delete", "event_update"]:
            if e in self.cfg:
                return True
        return False

    def event_match(self, event_type):
        for e in self._ensure_list(self.cfg.get('event_create', [])):
            if fnmatch.fnmatch(event_type, e):
                return EVENT_CREATE
        for e in self._ensure_list(self.cfg.get('event_delete', [])):
            if fnmatch.fnmatch(event_type, e):
                return EVENT_DELETE
        for e in self._ensure_list(self.cfg.get('event_update', [])):
            if fnmatch.fnmatch(event_type, e):
                return EVENT_UPDATE

    def sample_attributes(self, sample):
        attrs = {}
        for name, definition in self._attributes.items():
            value = definition.parse(sample)
            if value is not None:
                attrs[name] = value
        return attrs

    def event_attributes(self, event):
        attrs = {}
        traits = dict([(trait[0], trait[2]) for trait in event['traits']])
        for attr, field in self.cfg.get('event_attributes', {}).items():
            value = traits.get(field)
            if value is not None:
                attrs[attr] = value
        return attrs


class LockedDefaultDict(defaultdict):
    """defaultdict with lock to handle threading

    Dictionary only deletes if nothing is accessing dict and nothing is holding
    lock to be deleted. If both cases are not true, it will skip delete.
    """
    def __init__(self, *args, **kwargs):
        self.lock = threading.Lock()
        super(LockedDefaultDict, self).__init__(*args, **kwargs)

    def __getitem__(self, key):
        with self.lock:
            return super(LockedDefaultDict, self).__getitem__(key)

    def pop(self, key, *args):
        with self.lock:
            key_lock = super(LockedDefaultDict, self).__getitem__(key)
            if key_lock.acquire(False):
                try:
                    super(LockedDefaultDict, self).pop(key, *args)
                finally:
                    key_lock.release()


class GnocchiDispatcher(dispatcher.MeterDispatcherBase):
    """Dispatcher class for recording metering data into database.

    The dispatcher class records each meter into the gnocchi service
    configured in ceilometer configuration file. An example configuration may
    look like the following:

    [dispatcher_gnocchi]
    url = http://localhost:8041
    archive_policy = low

    To enable this dispatcher, the following section needs to be present in
    ceilometer.conf file

    [DEFAULT]
    meter_dispatchers = gnocchi
    """
    def __init__(self, conf):
        super(GnocchiDispatcher, self).__init__(conf)
        self.conf = conf
        self.filter_service_activity = (
            conf.dispatcher_gnocchi.filter_service_activity)
        self._ks_client = keystone_client.get_client()
        self.resources_definition = self._load_resources_definitions(conf)

        self.cache = None
        try:
            import oslo_cache
            oslo_cache.configure(self.conf)
            # NOTE(cdent): The default cache backend is a real but
            # noop backend. We don't want to use that here because
            # we want to avoid the cache pathways entirely if the
            # cache has not been configured explicitly.
            if 'null' not in self.conf.cache.backend:
                cache_region = oslo_cache.create_region()
                self.cache = oslo_cache.configure_cache_region(
                    self.conf, cache_region)
                self.cache.key_mangler = cache_key_mangler
        except ImportError:
            pass
        except oslo_cache.exception.ConfigurationError as exc:
            LOG.warning(_LW('unable to configure oslo_cache: %s') % exc)

        self._gnocchi_project_id = None
        self._gnocchi_project_id_lock = threading.Lock()
        self._gnocchi_resource_lock = LockedDefaultDict(threading.Lock)

        self._gnocchi = gnocchi_client.get_gnocchiclient(conf)

        # Convert retry_interval secs to msecs for retry decorator
        retries = conf.storage.max_retries

        @retrying.retry(wait_fixed=conf.storage.retry_interval * 1000,
                        stop_max_attempt_number=(retries if retries >= 0
                                                 else None))
        def _get_connection():
            self._gnocchi.capabilities.list()

        try:
            _get_connection()
        except Exception:
            LOG.error(_LE('Failed to connect to Gnocchi.'))
            raise

    @classmethod
    def _load_resources_definitions(cls, conf):
        plugin_manager = extension.ExtensionManager(
            namespace='ceilometer.event.trait_plugin')
        data = declarative.load_definitions(
            {}, conf.dispatcher_gnocchi.resources_definition_file)
        resource_defs = []
        for resource in data.get('resources', []):
            try:
                resource_defs.append(ResourcesDefinition(
                    resource,
                    conf.dispatcher_gnocchi.archive_policy, plugin_manager))
            except Exception as exc:
                LOG.error(_LE("Failed to load resource due to error %s") %
                          exc)
        return resource_defs

    @property
    def gnocchi_project_id(self):
        if self._gnocchi_project_id is not None:
            return self._gnocchi_project_id
        with self._gnocchi_project_id_lock:
            if self._gnocchi_project_id is None:
                try:
                    project = self._ks_client.projects.find(
                        name=self.conf.dispatcher_gnocchi.filter_project)
                except ka_exceptions.NotFound:
                    LOG.warning(_LW('gnocchi project not found in keystone,'
                                    ' ignoring the filter_service_activity '
                                    'option'))
                    self.filter_service_activity = False
                    return None
                except Exception:
                    LOG.exception('fail to retrieve user of Gnocchi service')
                    raise
                self._gnocchi_project_id = project.id
                LOG.debug("gnocchi project found: %s", self.gnocchi_project_id)
            return self._gnocchi_project_id

    def _is_swift_account_sample(self, sample):
        return bool([rd for rd in self.resources_definition
                     if rd.cfg['resource_type'] == 'swift_account'
                     and rd.metric_match(sample['counter_name'])])

    def _is_gnocchi_activity(self, sample):
        return (self.filter_service_activity and self.gnocchi_project_id and (
            # avoid anything from the user used by gnocchi
            sample['project_id'] == self.gnocchi_project_id or
            # avoid anything in the swift account used by gnocchi
            (sample['resource_id'] == self.gnocchi_project_id and
             self._is_swift_account_sample(sample))
        ))

    def _get_resource_definition_from_metric(self, metric_name):
        for rd in self.resources_definition:
            if rd.metric_match(metric_name):
                return rd

    def _get_resource_definition_from_event(self, event_type):
        for rd in self.resources_definition:
            operation = rd.event_match(event_type)
            if operation:
                return rd, operation

    def record_metering_data(self, data):
        # We may have receive only one counter on the wire
        if not isinstance(data, list):
            data = [data]
        # NOTE(sileht): skip sample generated by gnocchi itself
        data = [s for s in data if not self._is_gnocchi_activity(s)]

        # FIXME(sileht): This method bulk the processing of samples
        # grouped by resource_id and metric_name but this is not
        # efficient yet because the data received here doesn't often
        # contains a lot of different kind of samples
        # So perhaps the next step will be to pool the received data from
        # message bus.
        data.sort(key=lambda s: (s['resource_id'], s['counter_name']))

        resource_grouped_samples = itertools.groupby(
            data, key=operator.itemgetter('resource_id'))

        gnocchi_data = {}
        measures = {}
        stats = dict(measures=0, resources=0, metrics=0)
        for resource_id, samples_of_resource in resource_grouped_samples:
            stats['resources'] += 1
            metric_grouped_samples = itertools.groupby(
                list(samples_of_resource),
                key=operator.itemgetter('counter_name'))

            # NOTE(sileht): We convert resource id to Gnocchi format
            # because batch_resources_metrics_measures exception
            # returns this id and not the ceilometer one
            gnocchi_id = gnocchi_utils.encode_resource_id(resource_id)
            res_info = {}
            for metric_name, samples in metric_grouped_samples:
                stats['metrics'] += 1

                samples = list(samples)
                rd = self._get_resource_definition_from_metric(metric_name)
                if rd is None:
                    LOG.warning(_LW("metric %s is not handled by Gnocchi") %
                                metric_name)
                    continue
                if rd.cfg.get("ignore"):
                    continue

                res_info['resource_type'] = rd.cfg['resource_type']
                res_info.setdefault("resource", {}).update({
                    "id": resource_id,
                    "user_id": samples[0]['user_id'],
                    "project_id": samples[0]['project_id'],
                    "metrics": rd.metrics,
                })

                for sample in samples:
                    res_info.setdefault("resource_extra", {}).update(
                        rd.sample_attributes(sample))
                    m = measures.setdefault(gnocchi_id, {}).setdefault(
                        metric_name, [])
                    m.append({'timestamp': sample['timestamp'],
                              'value': sample['counter_volume']})
                    unit = sample['counter_unit']
                    metric = sample['counter_name']
                    res_info['resource']['metrics'][metric]['unit'] = unit

                stats['measures'] += len(measures[gnocchi_id][metric_name])
                res_info["resource"].update(res_info["resource_extra"])
                if res_info:
                    gnocchi_data[gnocchi_id] = res_info

        try:
            self.batch_measures(measures, gnocchi_data, stats)
        except gnocchi_exc.ClientException as e:
            LOG.error(six.text_type(e))
        except Exception as e:
            LOG.error(six.text_type(e), exc_info=True)

        for gnocchi_id, info in gnocchi_data.items():
            resource = info["resource"]
            resource_type = info["resource_type"]
            resource_extra = info["resource_extra"]
            if not resource_extra:
                continue
            try:
                self._if_not_cached("update", resource_type, resource,
                                    self._update_resource, resource_extra)
            except gnocchi_exc.ClientException as e:
                LOG.error(six.text_type(e))
            except Exception as e:
                LOG.error(six.text_type(e), exc_info=True)

    RE_UNKNOW_METRICS = re.compile("Unknown metrics: (.*) \(HTTP 400\)")
    RE_UNKNOW_METRICS_LIST = re.compile("([^/ ,]*)/([^,]*)")

    def batch_measures(self, measures, resource_infos, stats):
        # NOTE(sileht): We don't care about error here, we want
        # resources metadata always been updated
        try:
            self._gnocchi.metric.batch_resources_metrics_measures(measures)
        except gnocchi_exc.BadRequest as e:
            m = self.RE_UNKNOW_METRICS.match(six.text_type(e))
            if m is None:
                raise

            # NOTE(sileht): Create all missing resources and metrics
            metric_list = self.RE_UNKNOW_METRICS_LIST.findall(m.group(1))
            gnocchi_ids_freshly_handled = set()
            for gnocchi_id, metric_name in metric_list:
                if gnocchi_id in gnocchi_ids_freshly_handled:
                    continue
                resource = resource_infos[gnocchi_id]['resource']
                resource_type = resource_infos[gnocchi_id]['resource_type']
                try:
                    self._if_not_cached("create", resource_type, resource,
                                        self._create_resource)
                except gnocchi_exc.ResourceAlreadyExists:
                    metric = {'resource_id': resource['id'],
                              'name': metric_name}
                    metric.update(resource["metrics"][metric_name])
                    try:
                        self._gnocchi.metric.create(metric)
                    except gnocchi_exc.NamedMetricAlreadyExists:
                        # NOTE(sileht): metric created in the meantime
                        pass
                    except gnocchi_exc.ClientException as e:
                        LOG.error(six.text_type(e))
                        # We cannot post measures for this metric
                        del measures[gnocchi_id][metric_name]
                        if not measures[gnocchi_id]:
                            del measures[gnocchi_id]
                except gnocchi_exc.ClientException as e:
                    LOG.error(six.text_type(e))
                    # We cannot post measures for this resource
                    del measures[gnocchi_id]
                    gnocchi_ids_freshly_handled.add(gnocchi_id)
                else:
                    gnocchi_ids_freshly_handled.add(gnocchi_id)

            # NOTE(sileht): we have created missing resources/metrics,
            # now retry to post measures
            self._gnocchi.metric.batch_resources_metrics_measures(measures)

        # FIXME(sileht): take care of measures removed in stats
        LOG.debug("%(measures)d measures posted against %(metrics)d "
                  "metrics through %(resources)d resources", stats)

    def _create_resource(self, resource_type, resource):
        self._gnocchi.resource.create(resource_type, resource)
        LOG.debug('Resource %s created', resource["id"])

    def _update_resource(self, resource_type, resource, resource_extra):
        self._gnocchi.resource.update(resource_type,
                                      resource["id"],
                                      resource_extra)
        LOG.debug('Resource %s updated', resource["id"])

    def _if_not_cached(self, operation, resource_type, resource, method,
                       *args, **kwargs):
        if self.cache:
            cache_key = resource['id']
            attribute_hash = self._check_resource_cache(cache_key, resource)
            hit = False
            if attribute_hash:
                with self._gnocchi_resource_lock[cache_key]:
                    # NOTE(luogangyi): there is a possibility that the
                    # resource was already built in cache by another
                    # ceilometer-collector when we get the lock here.
                    attribute_hash = self._check_resource_cache(cache_key,
                                                                resource)
                    if attribute_hash:
                        method(resource_type, resource, *args, **kwargs)
                        self.cache.set(cache_key, attribute_hash)
                    else:
                        hit = True
                        LOG.debug('resource cache recheck hit for '
                                  '%s %s', operation, cache_key)
                self._gnocchi_resource_lock.pop(cache_key, None)
            else:
                hit = True
                LOG.debug('Resource cache hit for %s %s', operation, cache_key)
            if hit and operation == "create":
                raise gnocchi_exc.ResourceAlreadyExists()
        else:
            method(resource_type, resource, *args, **kwargs)

    def _check_resource_cache(self, key, resource_data):
        cached_hash = self.cache.get(key)
        attribute_hash = hash(frozenset(filter(lambda x: x[0] != "metrics",
                                               resource_data.items())))
        if not cached_hash or cached_hash != attribute_hash:
            return attribute_hash
        else:
            return None

    def record_events(self, events):
        for event in events:
            rd = self._get_resource_definition_from_event(event['event_type'])
            if not rd:
                LOG.debug("No gnocchi definition for event type: %s",
                          event['event_type'])
                continue

            rd, operation = rd
            resource_type = rd.cfg['resource_type']
            resource = rd.event_attributes(event)

            if operation == EVENT_DELETE:
                ended_at = timeutils.utcnow().isoformat()
                resources_to_end = [resource]
                extra_resources = cfg['event_associated_resources'].items()
                for resource_type, filters in extra_resources:
                    resources_to_end.extend(self._gnocchi.search_resource(
                        resource_type, filters['query'] % resource['id']))
                for resource in resources_to_end:
                    try:
                        self._gnocchi.update_resource(resource_type,
                                                      resource['id'],
                                                      {'ended_at': ended_at})
                    except gnocchi_exc.NoSuchResource:
                        LOG.debug(_("Delete event received on unexiting "
                                    "resource (%s), ignore it.") %
                                  resource['id'])
                    except Exception:
                        LOG.error(_LE("Fail to update the resource %s") %
                                  resource, exc_info=True)
